# Functions to control the nodes' operations.

from collections import namedtuple
import glob
import os
import shutil
import time
import logging

from ZeekControl import execute
from ZeekControl import events
from ZeekControl import util
from ZeekControl import config
from ZeekControl import install
from ZeekControl import cron
from ZeekControl import node as node_mod
from ZeekControl import cmdresult


# Waits for the nodes' Zeek processes to reach the given status.
# Build the Zeek parameters for the given node. Include
# script for live operation if live is true.
def _make_zeek_params(node, live):
    args = []

    if live and node.interface:
        try:
            # Interface name needs quotes so that shell doesn't interpret any
            # potential metacharacters in the name.
            args += ["-i", "'%s'" % node.interface]
        except AttributeError:
            pass

        if config.Config.savetraces:
            args += ["-w", "trace.pcap"]

    args += ["-U", ".status"]
    args += ["-p", "zeekctl"]

    if live:
        args += ["-p", "zeekctl-live"]

    if node_mod.is_standalone(node):
        args += ["-p", "standalone"]

    for prefix in config.Config.prefixes.split(":"):
        args += ["-p", "%s" % prefix]

    args += ["-p", "%s" % node.name]

    # The order of loaded scripts is as follows:
    # 1) SitePolicyScripts (local.zeek by default) gives a common set of loaded
    #    scripts for all nodes.
    # 2) The common configuration of zeekctl is loaded via the zeekctl package.
    # 3) The distribution's default settings for node configuration are loaded
    #    from either the cluster framework or standalone scripts. At this point
    #    anything in the distribution's default per-node is overridable and any
    #    identifiers in local.zeek are able to be used (e.g. in defining
    #    a notice policy).
    # 4) Autogenerated zeekctl scripts are loaded, which may contain
    #    settings that override the previously loaded scripts.
    #    (e.g. see Log::default_rotation_interval)
    args += config.Config.sitepolicyscripts.split()
    args += ["zeekctl"]
    if node_mod.is_standalone(node):
        args += ["zeekctl/standalone"]
    else:
        args += ["base/frameworks/cluster"]

    args += ["zeekctl/auto"]

    if getattr(node, "aux_scripts", None):
        args += [node.aux_scripts]

    if config.Config.zeekargs:
        # Some args in zeekargs might contain spaces, so we cannot split it.
        args += [config.Config.zeekargs]

    return args

# Build the environment variables for the given node.
def _make_env_params(node, returnlist=False):
    envs = []
    if not node_mod.is_standalone(node):
        envs.append("CLUSTER_NODE=%s" % node.name)

    envs += ["%s=%s" % (key, val) for (key, val) in sorted(node.env_vars.items())]

    if returnlist:
        envlist = [("-v", i) for i in envs]
        return [j for i in envlist for j in i]

    return " ".join(envs)


def fmttime(t):
    return time.strftime(config.Config.timefmt, time.localtime(float(t)))


class Controller:
    def __init__(self, config, ui, executor, pluginregistry):
        self.config = config
        self.ui = ui
        self.executor = executor
        self.pluginregistry = pluginregistry

        # Create zeekctl-config.sh file so that shell script helpers have
        # current config values.
        install.make_zeekctl_config_sh(ui)

    def start(self, nodes):
        results = cmdresult.CmdResult()

        loggers, manager, proxies, workers = node_mod.separate_types(nodes)

        for n in nodes:
            n.setExpectRunning(True)

        # Start nodes. Do it in the order loggers, manager, proxies, workers.
        if loggers:
            self._start_nodes(loggers, results)

            if not results.ok:
                for n in (manager + proxies + workers):
                    results.set_node_fail(n)
                return results

        if manager:
            self._start_nodes(manager, results)

            if not results.ok:
                for n in (proxies + workers):
                    results.set_node_fail(n)
                return results

        if proxies:
            self._start_nodes(proxies, results)

            if not results.ok:
                for n in workers:
                    results.set_node_fail(n)
                return results

        if workers:
            self._start_nodes(workers, results)

        return results


    # Starts the given nodes.
    def _start_nodes(self, nodes, results):
        self.ui.info("starting %s ..." % node_mod.nodes_describe(nodes))

        filtered = []
        # Ignore nodes which are still running.
        for (node, isrunning) in self._isrunning(nodes):
            if not isrunning:
                filtered += [node]

        nodes = filtered

        # Generate crash report for any crashed nodes.
        crashed = [node for node in nodes if node.hasCrashed()]
        if crashed:
            self.ui.info("creating crash report for previously crashed nodes: %s" % ", ".join([n.name for n in crashed]))
            self._make_crash_reports(crashed)

        # Make working directories.
        dirs = [(node, node.cwd()) for node in nodes]
        nodes = []
        for (node, success, output) in self.executor.mkdirs(dirs):
            if success:
                nodes += [node]
            else:
                self.ui.error("cannot create working directory for %s" % node.name)
                results.set_node_fail(node)

        # Start Zeek process.
        cmds = []
        for node in nodes:
            envs = []
            pin_cpu = node.pin_cpus

            # If this node isn't using CPU pinning, then use a placeholder value
            if pin_cpu == "":
                pin_cpu = -1

            envs = _make_env_params(node, True)
            cmds += [(node, "start", envs + [node.cwd(), str(pin_cpu)] + _make_zeek_params(node, True))]

        nodes = []
        # Note: the shell is used to interpret the command because zeekargs
        # might contain quoted arguments.
        for (node, success, output) in self.executor.run_helper(cmds, shell=True):
            if success:
                if not output:
                    self.ui.error("failed to get PID of %s" % node.name)
                    results.set_node_fail(node)
                    continue

                pidstr = output.splitlines()[0]
                try:
                    pid = int(pidstr)
                except ValueError:
                    self.ui.error("invalid PID for %s: %s" % (node.name, pidstr))
                    results.set_node_fail(node)
                    continue

                nodes += [node]
                node.setPID(pid)
            else:
                self.ui.error('cannot start %s; check output of "diag"' % node.name)
                results.set_node_fail(node)
                if output:
                    self.ui.error(output)

        # Check whether processes did indeed start up.
        hanging = []
        running = []

        for (node, success) in self._waitforzeeks(nodes, "RUNNING", 3, True):
            if success:
                running += [node]
            else:
                hanging += [node]

        # It can happen that Zeek hangs in DNS lookups at startup
        # which can take a while. At this point we already know
        # that the process has been started (_waitforzeeks ensures that).
        # If by now there is not a TERMINATED status, we assume that it
        # is doing fine and will move on to RUNNING once DNS is done.
        for (node, success) in self._waitforzeeks(hanging, "TERMINATED", 0, False):
            if success:
                self.ui.error('%s terminated immediately after starting; check output with "diag"' % node.name)
                node.clearPID()
                results.set_node_fail(node)
            else:
                self.ui.info("(%s still initializing)" % node.name)
                running += [node]

        for node in running:
            self._log_action(node, "started")
            results.set_node_success(node)

        return results

    def _isrunning(self, nodes, setcrashed=True):

        results = []
        cmds = []

        for node in nodes:
            pid = node.getPID()
            if not pid:
                results += [(node, False)]
                continue

            cmds += [(node, "check-pid", [str(pid)])]

        for (node, success, output) in self.executor.run_helper(cmds):
            # If we cannot run the helper script, then we ignore this node
            # because the process might actually be running but we can't tell.
            if not success:
                self.ui.error("failed to run check-pid on node %s" % node.name)
                continue

            running = output.strip() == "running"

            results += [(node, running)]

            if not running:
                if setcrashed:
                    # Grmpf. It crashed.
                    node.clearPID()
                    node.setCrashed()

        return results

    def _waitforzeeks(self, nodes, status, timeout, ensurerunning):
        # If ensurerunning is true, process must still be running.
        if ensurerunning:
            running = self._isrunning(nodes)
        else:
            running = [(node, True) for node in nodes]

        results = []

        # Determine set of nodes still to check.
        todo = {}
        for (node, isrunning) in running:
            if isrunning:
                todo[node.name] = node
            else:
                results += [(node, False)]

        while True:
            # Determine whether process is still running. We need to do this
            # before we get the state to avoid a race condition.

            nodelist = sorted(todo.values(), key=node_mod.sortnode)
            running = self._isrunning(nodelist, setcrashed=False)

            # Check nodes' .status file
            cmds = []
            for node in nodelist:
                cmds += [(node, "first-line", ["%s/.status" % node.cwd()])]

            for (node, success, output) in self.executor.run_helper(cmds):
                if not success or not output:
                    continue

                fields = output.split()
                if len(fields) == 2:
                    if status in fields[0]:
                        # Status reached. Cool.
                        del todo[node.name]
                        results += [(node, True)]
                else:
                    # Something's wrong. We give up on that node.
                    del todo[node.name]
                    results += [(node, False)]

            for (node, isrunning) in running:
                if node.name in todo and not isrunning:
                    # Alright, a dead node's status will not change anymore.
                    del todo[node.name]
                    results += [(node, False)]

            if not todo:
                # All done.
                break

            # Wait a bit before we start over.
            time.sleep(1)

            # Timeout reached?
            timeout -= 1
            if timeout <= 0:
                break

            logging.debug("Waiting for %d node(s)...", len(todo))

        for node in todo.values():
            # These did time-out.
            results += [(node, False)]

        if todo:
            logging.debug("Timeout while waiting for %d node(s)", len(todo))

        return results

    def _log_action(self, node, action):
        if not self.config.statslogenable:
            return
        t = time.time()
        with open(self.config.statslog, "a") as out:
            out.write("%s %s action %s\n" % (t, node, action))

    # Do a "post-terminate crash" for the given nodes.
    def _make_crash_reports(self, nodes):
        for n in nodes:
            self.pluginregistry.zeekProcessDied(n)

        msg_header_backtrace = "If you want to help us debug this problem, then please forward\nthis mail to reports@zeek.org\n"

        msg_header_no_backtrace = "This crash report does not include a backtrace.  In order for crash reports\nto be useful when Zeek crashes, a backtrace is needed.\n"

        postterminate = os.path.join(self.config.scriptsdir, "post-terminate")
        cmds = [(node, postterminate, [node.type, node.cwd(), "crash"]) for node in nodes]

        for (node, success, output) in self.executor.run_cmds(cmds):
            if success:
                crashreport = output

                # Note: here it is assumed that the crash-diag script outputs
                # this string only when there's a backtrace.
                has_backtrace = "Core file: " in crashreport

                if has_backtrace:
                    msg = msg_header_backtrace + crashreport
                else:
                    msg = msg_header_no_backtrace + crashreport

                msuccess, moutput = self._sendmail("Crash report from %s" % node.name, msg)
                if not msuccess:
                    self.ui.error("error occurred while trying to send mail: %s" % moutput)
            else:
                self.ui.error("error running post-terminate for %s:\n%s" % (node.name, output))

            node.clearCrashed()

    def _sendmail(self, subject, body):
        if not self.config.sendmail:
            return True, ""

        cmd = "%s '%s'" % (os.path.join(self.config.scriptsdir, "send-mail"), subject)
        return execute.run_localcmd(cmd, inputtext=body)

    # Stop Zeek processes on nodes.
    def stop(self, nodes):
        results = cmdresult.CmdResult()

        loggers, manager, proxies, workers = node_mod.separate_types(nodes)

        for n in nodes:
            n.setExpectRunning(False)

        # Stop nodes. Do it in the order workers, proxies, manager, loggers
        # (the reverse of "start").
        if workers:
            self._stop_nodes(workers, results)

            if not results.ok:
                for n in (proxies + manager + loggers):
                    results.set_node_fail(n)
                return results

        if proxies:
            self._stop_nodes(proxies, results)

            if not results.ok:
                for n in (manager + loggers):
                    results.set_node_fail(n)
                return results

        if manager:
            self._stop_nodes(manager, results)

            if not results.ok:
                for n in loggers:
                    results.set_node_fail(n)
                return results

        if loggers:
            self._stop_nodes(loggers, results)

        return results

    def _stop_nodes(self, nodes, results):
        self.ui.info("stopping %s ..." % node_mod.nodes_describe(nodes))

        running = []

        # Check which nodes are still running.
        for (node, isrunning) in self._isrunning(nodes):
            if isrunning:
                running += [node]
            else:
                results.set_node_success(node)

        # Generate crash report for any crashed nodes.
        crashed = [node for node in nodes if node.hasCrashed()]
        if crashed:
            self.ui.info("creating crash report for previously crashed nodes: %s" % ", ".join([n.name for n in crashed]))
            self._make_crash_reports(crashed)

        # Helper function to stop nodes with given signal.
        def stop(nodes, signal):
            cmds = []
            for node in nodes:
                cmds += [(node, "stop", [str(node.getPID()), str(signal)])]

            return self.executor.run_helper(cmds)

        # Stop nodes.
        for (node, success, output) in stop(running, 15):
            if not success:
                # Give up on this node.  Most likely either we cannot connect
                # to the host, or we don't have permission to kill the process.
                self.ui.error("unable to stop %s: %s" % (node.name, output))
                results.set_node_fail(node)
                running.remove(node)

        if running:
            time.sleep(1)

        # Check whether they terminated.
        terminated = []
        kill = []
        for (node, success) in self._waitforzeeks(running, "TERMINATED", self.config.stoptimeout, False):
            if not success:
                # Check whether it crashed during shutdown ...
                result = self._isrunning([node])
                for (node, isrunning) in result:
                    if isrunning:
                        self.ui.info("%s did not terminate ... killing ..." % node.name)
                        kill += [node]
                    else:
                        # crashed flag is set by _isrunning().
                        self.ui.info("%s crashed during shutdown" % node.name)

        if kill:
            # Kill those which did not terminate gracefully.
            stop(kill, 9)
            # Give them a bit to disappear.
            time.sleep(5)

        # Check which are still running. We check all nodes to be on the safe
        # side and give them a bit more time to finally disappear.
        timeout = 10

        todo = {}
        for node in running:
            todo[node.name] = node

        while True:

            nodelist = sorted(todo.values(), key=node_mod.sortnode)
            running = self._isrunning(nodelist, setcrashed=False)

            for (node, isrunning) in running:
                if node.name in todo and not isrunning:
                    # Alright, it's gone.
                    del todo[node.name]
                    terminated += [node]
                    results.set_node_success(node)

            if not todo:
                # All done.
                break

            # Wait a bit before we start over.

            if timeout <= 0:
                break

            time.sleep(1)
            timeout -= 1

        for node in todo.values():
            results.set_node_fail(node)

        # Do post-terminate cleanup for those which terminated gracefully.
        cleanup = [node for node in terminated if not node.hasCrashed()]

        cmds = []
        postterminate = os.path.join(self.config.scriptsdir, "post-terminate")
        for node in cleanup:
            crashflag = "killed" if node in kill else ""

            cmds += [(node, postterminate, [node.type, node.cwd(), crashflag])]

        for (node, success, output) in self.executor.run_cmds(cmds):
            if success:
                self._log_action(node, "stopped")
            else:
                self.ui.error("error running post-terminate for %s:\n%s" % (node.name, output))
                self._log_action(node, "stopped (failed)")

            node.clearPID()
            node.clearCrashed()

        return results


    # Output status summary for nodes.
    def status(self, nodes):
        results = cmdresult.CmdResult()

        showall = self.config.statuscmdshowall

        if showall:
            self.ui.info("Getting process status ...")

        nodestatus = self._isrunning(nodes)
        running = []

        cmds = []
        for (node, isrunning) in nodestatus:
            if isrunning:
                running += [node]
                cmds += [(node, "first-line", ["%s/.status" % node.cwd(), "%s/.startup" % node.cwd()])]

        statuses = {}
        startups = {}
        for (n, success, output) in self.executor.run_helper(cmds):
            out = output.splitlines()
            try:
                val = out[0].split()[0].lower() if (success and out[0]) else "???"
            except IndexError:
                val = "???"

            statuses[n.name] = val

            try:
                val = fmttime(out[1]) if (success and out[1]) else "???"
            except (IndexError, ValueError):
                val = "???"

            startups[n.name] = val

        if showall:
            self.ui.info("Getting peer status ...")
            peers = {}
            nodes = [n for n in running if statuses[n.name] == "running"]
            for (node, success, args) in self._query_peerstatus(nodes):
                if success and args:
                    peers[node.name] = []
                    for f in args[0].split():
                        if not f.startswith("peer="):
                            continue
                        # Get everything after the '=' character.
                        val = f[5:]
                        if val:
                            peers[node.name] += [val]

        for (node, isrunning) in nodestatus:
            node_info = {
                "name": node.name,
                "type": node.type,
                "host": node.host,
                "status": "stopped",
                "pid": None,
                "started": None,
            }
            if showall:
                node_info["peers"] = None

            if isrunning:
                node_info["status"] = statuses[node.name]
            elif node.hasCrashed():
                node_info["status"] = "crashed"

            if isrunning:
                node_info["pid"] = node.getPID()

                if showall:
                    if node.name in peers:
                        node_info["peers"] = len(peers[node.name])
                    else:
                        node_info["peers"] = "???"

                node_info["started"] = startups[node.name]

            results.set_node_data(node, True, node_info)

        return results

    # Check the configuration for nodes without installing first.
    def check(self, nodes):
        return self._check_config(nodes, False, False)

    # Print the loaded_scripts.log for either the installed scripts
    # (if "check" is false), or the original scripts (if "check" is true).
    def scripts(self, nodes, check):
        return self._check_config(nodes, not check, True)


    def _check_config(self, nodes, installed, list_scripts):
        results = cmdresult.CmdResult()

        nodetmpdirs = [(node, os.path.join(self.config.tmpdir, "check-config-%s" % node.name)) for node in nodes]

        nodes = []
        for (node, cwd) in nodetmpdirs:
            if os.path.isdir(cwd):
                try:
                    shutil.rmtree(cwd)
                except OSError as err:
                    self.ui.error("cannot remove directory %s: %s" % (cwd, err))
                    results.ok = False
                    return results

            try:
                os.makedirs(cwd)
            except OSError as err:
                self.ui.error("cannot create temporary directory: %s" % err)
                results.ok = False
                return results

            nodes += [(node, cwd)]

        cmds = []
        for (node, cwd) in nodes:

            env = _make_env_params(node)

            installed_policies = "1" if installed else "0"
            print_scripts = "1" if list_scripts else "0"

            if not install.make_layout(cwd, self.ui, True):
                results.ok = False
                return results
            if not install.make_local_networks(cwd, self.ui):
                results.ok = False
                return results

            if not install.make_zeekctl_config_policy(cwd, self.ui, self.pluginregistry):
                results.ok = False
                return results

            cmd = os.path.join(self.config.scriptsdir, "check-config") + " %s %s %s %s" % (installed_policies, print_scripts, cwd, " ".join(_make_zeek_params(node, False)))
            cmd += " zeekctl/check"

            cmds += [((node, cwd), cmd, env, None)]

        for ((node, cwd), success, output) in execute.run_localcmds(cmds):
            results.set_node_output(node, success, output)
            try:
                shutil.rmtree(cwd)
            except OSError as err:
                # Don't bother reporting an error now.
                pass

        return results

    def _query_peerstatus(self, nodes):
        running = self._isrunning(nodes)

        eventlist = []
        for (node, isrunning) in running:
            if isrunning:
                eventlist += [(node, "Control::peer_status_request", [], "Control::peer_status_response")]

        return events.send_events_parallel(eventlist, config.Config.controltopic)

    def execute_cmd(self, nodes, cmd):
        results = cmdresult.CmdResult()

        for node, success, out in self.executor.run_shell_cmds([(n, cmd) for n in nodes]):
            results.set_node_output(node, success, out)

        return results

    # Clean up the working directory for nodes (flushes state).
    # If cleantmp is true, also wipes ${tmpdir}; this is done
    # even when the node is still running.
    def cleanup(self, nodes, cleantmp=False):
        # Given a set of node names "orig" and command results "res", add
        # all node names to "orig" that have a failed result in "res".
        def addfailed(orig, res):
            for (node, status, output) in res:
                # if status is Fail, then add the node name
                if not status:
                    orig.add(node.name)

            return orig


        results = cmdresult.CmdResult()

        result = self._isrunning(nodes)
        running    = [node for (node, on) in result if on]
        notrunning = [node for (node, on) in result if not on]

        for node in running:
            self.ui.info("   %s is still running, not cleaning work directory" % node)

        results1 = self.executor.rmdirs([(n, n.cwd()) for n in notrunning])
        results2 = self.executor.mkdirs([(n, n.cwd()) for n in notrunning])
        failed = set()
        failed = addfailed(failed, results1)
        failed = addfailed(failed, results2)

        for node in notrunning:
            node.clearCrashed()

        if cleantmp:
            self.ui.info("cleaning %s ..." % self.config.tmpdir)
            results3 = self.executor.rmdirs([(n, self.config.tmpdir) for n in running + notrunning])
            results4 = self.executor.mkdirs([(n, self.config.tmpdir) for n in running + notrunning])
            failed = addfailed(failed, results3)
            failed = addfailed(failed, results4)

        for node in nodes:
            if node.name in failed:
                results.set_node_fail(node)
            else:
                results.set_node_success(node)

        return results

    # Report diagnostics for nodes (e.g., stderr output).
    def diag(self, nodes):
        results = cmdresult.CmdResult()

        crashdiag = os.path.join(self.config.scriptsdir, "crash-diag")
        cmds = [(node, crashdiag, [node.cwd()]) for node in nodes]

        for (node, success, output) in self.executor.run_cmds(cmds):
            if not success:
                errmsgs = "error running crash-diag for %s\n" % node.name
                errmsgs += output
                results.set_node_output(node, False, errmsgs)
                continue

            results.set_node_output(node, True, output)

        return results

    def capstats(self, nodes, interval):
        results = cmdresult.CmdResult()

        if not self.config.capstatspath:
            results.set_node_data(nodes[0], False, {"output": 'Error: cannot run capstats because zeekctl option "capstatspath" is not defined'})
            return results

        for (node, netif, success, vals) in self.get_capstats_output(nodes, interval):
            if not success:
                vals = {"output": vals}
            results.set_node_data(node, success, vals)

        if not results.nodes:
            results.ok = False

        return results

    # Gather capstats from interfaces.
    #
    # Returns a list of tuples of the form (node, netif, success, vals)
    # where 'netif' is the network interface name used by capstats on
    # the 'node', and 'success' is a boolean indicating whether or not
    # we were able to get the data; in case there's no error, 'vals' maps
    # tags to their values (otherwise, 'vals' is an error message).
    #
    # Tags are those as returned by capstats on the command-line.
    #
    # If there is more than one node, then the results will also contain
    # one "pseudo-node" of the name "$total" with the sum of all individual
    # values.
    def get_capstats_output(self, nodes, interval):
        results = []

        # Construct a list of (node, interface) tuples, one tuple for each
        # unique (host, interface) pair.
        nodenetifs = []
        hosts = {}
        for node in nodes:
            if not node.interface:
                continue

            netif = self._capstats_interface(node)
            if not netif:
                continue

            if hosts.setdefault((node.addr, netif), node) == node:
                nodenetifs.append((node, netif))

        capstats = self.config.capstatspath
        cmds = [(node, capstats, ["-I", str(interval), "-n", "1", "-i", interface]) for (node, interface) in nodenetifs]

        outputs = self.executor.run_cmds(cmds)

        totals = {}

        for (node, success, output) in outputs:
            netif = self._capstats_interface(node)

            if output:
                # Grab the first output line, because we might log this to
                # stats.log later.
                outputline = output.splitlines()[0]

            if not success:
                if output:
                    results += [(node, netif, False, "%s: capstats failed (%s)" % (node.name, outputline))]
                else:
                    results += [(node, netif, False, "%s: cannot execute capstats" % node.name)]
                continue

            if not output:
                results += [(node, netif, False, "%s: no capstats output" % node.name)]
                continue

            fields = outputline.split()[1:]

            if not fields:
                results += [(node, netif, False, "%s: unexpected capstats output: %s" % (node.name, outputline))]
                continue

            vals = {}

            try:
                for field in fields:
                    key, val = field.split("=")
                    val = float(val)
                    vals[key] = val

                    if key in totals:
                        totals[key] += val
                    else:
                        totals[key] = val

            except ValueError:
                results += [(node, netif, False, "%s: unexpected capstats output: %s" % (node.name, outputline))]
                continue

            results += [(node, netif, True, vals)]

        # Add pseudo-node for totals when there is more than one result
        if len(results) > 1:
            results += [(node_mod.Node(self.config, "$total"), None, True, totals)]

        return results


    # Convert a Zeek network interface name to one that capstats can use.
    def _capstats_interface(self, node):
        netif = node.interface

        if netif.startswith("dnacl") and netif.count("@") == 1:
            # PF_RING+DNA with pfdnacluster_master is being used
            # (e.g. interface name "dnacluster:21" gets changed to
            # "dnacluster:21@1" by the zeekctl pf_ring plugin)
            netif = netif.split("@", 1)[0]

        elif "::" in netif:
            # Interface name has packet source prefix (e.g. "af_packet::eth0"),
            # so don't try to run capstats on this interface unless it is
            # af_packet since we know that works.

            if netif.startswith("af_packet"):
                netif = netif.split("::")[1]
            else:
                netif = None

        return netif

    # Gets disk space on all volumes relevant to zeekctl installation.
    # Returns a list of the form:  [ (host, diskinfo), ...]
    # where diskinfo is a list of the form DiskInfo named tuple objects (fs,
    # total, used, avail, percent) or ["FAIL", <error message>] if an error
    # is encountered.
    def df(self, nodes):
        results = cmdresult.CmdResult()

        DiskInfo = namedtuple("DiskInfo", ("fs", "total", "used", "available", "percent"))
        dirs = ("logdir", "bindir", "helperdir", "cfgdir", "spooldir",
                "policydir", "libdir", "libdir64", "tmpdir", "staticdir", "scriptsdir")

        df = {}
        for node in nodes:
            df[node.name] = {}

        cmds = []
        for node in nodes:
            for key in dirs:
                if key == "logdir" and not (node_mod.is_logger(node) or node_mod.is_manager(node) or node_mod.is_standalone(node)):
                    # Don't need to check this on nodes that don't write logs.
                    continue

                path = self.config.config[key]

                if key == "libdir" or key == "libdir64":
                    if not os.path.exists(path):
                        continue

                cmds += [(node, "df", [path])]

        for (node, success, output) in self.executor.run_helper(cmds):
            if success:
                fields = output.split()
                if len(fields) != 4:
                    df[node.name]["FAIL"] = "wrong number of fields from df helper"
                    continue

                fs = fields[0]
                # Ignore NFS mounted volumes.
                if not fs.startswith("/") and ":" in fs:
                    continue

                try:
                    total = float(fields[1])
                    used = float(fields[2])
                    avail = float(fields[3])
                except ValueError as err:
                    df[node.name]["FAIL"] = "bad output from df helper: %s" % err
                    continue

                perc = used * 100.0 / (used + avail)
                df[node.name][fs] = DiskInfo(fs, total, used, avail, perc)
            else:
                df[node.name]["FAIL"] = output if output else "no output"

        for node in nodes:
            success = "FAIL" not in df[node.name]
            results.set_node_data(node, success, df[node.name])

        return results

    # Returns a list of tuples of the form (node, error, vals) where 'error' is
    # an error message string, or None if there was no error.  'vals' is a
    # dict which maps tags to their values.  Tags are "pid", "vsize",
    # "rss", "cpu", and "cmd".
    def get_top_output(self, nodes):

        results = []

        running = self._isrunning(nodes)

        # Get all the PIDs first.

        pids = {}

        for (node, isrunning) in running:
            if isrunning:
                pids[node.name] = node.getPID()
            else:
                results += [(node, "not running", {})]
                continue

        if not pids:
            return results

        cmds = []
        hosts = {}

        # Now run top once per host.
        for node in nodes:   # Do the loop again to keep the order.
            if node.name not in pids:
                continue

            if node.host in hosts:
                continue

            hosts[node.host] = 1

            cmds += [(node, "top", [])]

        if not cmds:
            return results

        res = {}
        for (node, success, output) in self.executor.run_helper(cmds):
            res[node.host] = success, output

        # Gather results for all the nodes that are running
        for node in nodes:
            if node.name not in pids:
                continue

            success, output = res[node.host]

            if not success:
                # The error msg gets written to stats.log, so we only want
                # the first line.
                errmsg = output.splitlines()[0] if output else ""
                results += [(node, "top failed: %s" % errmsg, {})]
                continue

            if not output:
                results += [(node, "no output from top", {})]
                continue

            # Get the zeek process info, which is a list of fields from
            # the "top" helper.
            procinfo = []
            try:
                for line in output.splitlines():
                    if int(line.split()[0]) == pids[node.name]:
                        procinfo = line.split()
                        break
            except (IndexError, ValueError) as err:
                results += [(node, "bad output from top: %s" % err, {})]
                continue

            if not procinfo:
                # It's possible that the process is no longer there.
                results += [(node, "not running", {})]
                continue

            vals = {}

            try:
                pid = int(procinfo[0])
                vals["pid"] = pid
                vals["vsize"] = int(float(procinfo[1])) #May be something like 2.17684e+9
                vals["rss"] = int(float(procinfo[2]))
                vals["cpu"] = procinfo[3]
                vals["cmd"] = " ".join(procinfo[4:])
            except (IndexError, ValueError) as err:
                results += [(node, "unexpected top output: %s" % err, {})]
                continue

            results += [(node, None, vals)]

        return results

    # Produce a top-like output for node's processes.
    def top(self, nodes):
        results = cmdresult.CmdResult()

        for (node, error, vals) in self.get_top_output(nodes):
            top_info = {"name": node.name, "type": node.type,
                        "host": node.host, "pid": None,
                        "vsize": None, "rss": None, "cpu": None,
                        "cmd": None, "error": None}
            if error:
                top_info["error"] = error
                results.set_node_data(node, False, {"procs": top_info})
                continue

            top_info2 = top_info.copy()
            top_info2.update(vals)

            results.set_node_data(node, True, {"procs": top_info2})

        return results

    def print_id(self, nodes, id):
        results = cmdresult.CmdResult()
        running = self._isrunning(nodes)

        eventlist = []
        for (node, isrunning) in running:
            if isrunning:
                eventlist += [(node, "Control::id_value_request", [id], "Control::id_value_response")]

        if not eventlist:
            results.set_node_output(nodes[0], False, "no running instances of Zeek")
            return results

        for (node, success, args) in events.send_events_parallel(eventlist, config.Config.controltopic):
            if success:
                out = "\n".join(args)
            else:
                out = args
            results.set_node_output(node, success, out)

        return results


    def _query_netstats(self, nodes):
        running = self._isrunning(nodes)

        eventlist = []
        for (node, isrunning) in running:
            if isrunning:
                eventlist += [(node, "Control::net_stats_request", [], "Control::net_stats_response")]

        return events.send_events_parallel(eventlist, config.Config.controltopic)

    def peerstatus(self, nodes):
        results = cmdresult.CmdResult()
        for (node, success, args) in self._query_peerstatus(nodes):
            if success:
                if args:
                    out = args[0]
                else:
                    out = ""
            else:
                out = args
            results.set_node_output(node, success, out)

        if not results.nodes:
            results.set_node_output(nodes[0], False, "no running instances of Zeek")

        return results

    def netstats(self, nodes):
        results = cmdresult.CmdResult()
        for (node, success, args) in self._query_netstats(nodes):
            if success:
                if args:
                    out = args[0].strip()
                else:
                    out = ""
            else:
                out = args
            results.set_node_output(node, success, out)

        if not results.nodes:
            results.set_node_output(nodes[0], False, "no running instances of Zeek")

        return results

    def process(self, trace, zeek_options, zeek_scripts):
        results = cmdresult.CmdResult()

        if not os.path.isfile(trace):
            self.ui.error("trace file not found: %s" % trace)
            results.ok = False
            return results

        if self.config.standalone:
            node = self.config.nodes()[0]
        else:
            node = self.config.workers()[0]

        cwd = os.path.join(self.config.tmpdir, "testing")

        if os.path.isdir(cwd):
            try:
                shutil.rmtree(cwd)
            except OSError as err:
                self.ui.error("cannot remove directory: %s" % err)
                results.ok = False
                return results

        try:
            os.makedirs(cwd)
        except OSError as err:
            self.ui.error("cannot create directory: %s" % err)
            results.ok = False
            return results

        env = _make_env_params(node)

        zeek_args = " ".join(zeek_options + _make_zeek_params(node, False))
        zeek_args += " zeekctl/process-trace"

        if zeek_scripts:
            zeek_args += " " + " ".join(zeek_scripts)

        cmd = os.path.join(self.config.scriptsdir, "run-zeek-on-trace") + " %s %s %s %s" % (0, cwd, trace, zeek_args)

        self.ui.info(cmd)

        success, output = execute.run_localcmd(cmd, env=env)

        if not success:
            results.ok = False

        self.ui.info(output)
        self.ui.info("### Zeek output in %s" % cwd)

        return results

    def install(self, local_only):
        results = cmdresult.CmdResult()

        try:
            self.config.record_zeek_version()
        except config.ConfigurationError as err:
            self.ui.error("%s" % err)
            results.ok = False
            return results

        manager = self.config.manager()

        # Delete previously installed policy files to not mix things up.
        policies = [self.config.policydirsiteinstall, self.config.policydirsiteinstallauto]

        for dirpath in policies:
            if os.path.isdir(dirpath):
                self.ui.info("removing old policies in %s ..." % dirpath)
                try:
                    shutil.rmtree(dirpath)
                except OSError as err:
                    self.ui.error("failed to remove directory %s: %s" % (dirpath, err))
                    results.ok = False
                    return results

        self.ui.info("creating policy directories ...")
        for dirpath in policies:
            try:
                os.makedirs(dirpath)
            except OSError as err:
                self.ui.error("failed to create directory: %s" % err)
                results.ok = False
                return results

        # Install local site policy.

        if self.config.sitepolicypath:
            self.ui.info("installing site policies ...")
            dst = self.config.policydirsiteinstall
            for dir in self.config.sitepolicypath.split(":"):
                dirpath = self.config.subst(dir)
                for pathname in glob.glob(os.path.join(dirpath, "*")):
                    if not execute.install(pathname, dst, self.ui):
                        results.ok = False
                        return results

        if not install.make_layout(self.config.policydirsiteinstallauto, self.ui):
            results.ok = False
            return results

        self.ui.info("generating local-networks.zeek ...")
        if not install.make_local_networks(self.config.policydirsiteinstallauto, self.ui):
            results.ok = False
            return results

        self.ui.info("generating zeekctl-config.zeek ...")
        if not install.make_zeekctl_config_policy(self.config.policydirsiteinstallauto, self.ui, self.pluginregistry):
            results.ok = False
            return results

        loggers = self.config.loggers()
        if loggers:
            # Just use the first logger that is defined.
            node_cwd = loggers[0].cwd()
        else:
            node_cwd = manager.cwd()

        current = self.config.subst(os.path.join(self.config.logdir, "current"))
        try:
            util.force_symlink(node_cwd, current)
        except (IOError, OSError) as err:
            results.ok = False
            self.ui.error("failed to update symlink '%s': %s" % (current, err))
            return results

        self.ui.info("generating zeekctl-config.sh ...")
        if not install.make_zeekctl_config_sh(self.ui):
            results.ok = False
            return results

        if local_only:
            return results

        # Make sure we install each remote host only once.
        nodes = self.config.hosts(exclude_local=True)

        # If there are no remote hosts, then we're done.
        if not nodes:
            # Save current configuration state.
            self.config.update_cfg_hash()
            return results

        # Sync to clients.
        self.ui.info("updating nodes ...")

        dirs = []

        if not self.config.havenfs:
            # Non-NFS, need to explicitly synchronize.
            syncs = install.get_syncs()
        else:
            # NFS. We only need to take care of the spool/log directories.

            # We need this only on the manager.
            dirs.append((manager, self.config.logdir))

            syncs = install.get_nfssyncs()

        syncs = [(dir, mirror) for (dir, mirror, optional) in syncs if not optional or os.path.exists(self.config.subst(dir))]

        createdirs = [self.config.subst(dir) for (dir, mirror) in syncs if not mirror]
        for n in nodes:
            for dir in createdirs:
                dirs.append((n, dir))

        for (node, success, output) in self.executor.mkdirs(dirs):
            if not success:
                self.ui.error("cannot create a directory on node %s" % node.name)
                if output:
                    self.ui.error(output)
                results.ok = False
                return results

        paths = [self.config.subst(dir) for (dir, mirror) in syncs if mirror]
        if not execute.sync(nodes, paths, self.ui):
            results.ok = False
            return results

        # Save current configuration state.
        self.config.update_cfg_hash()

        return results


    # Triggers all activity which is to be done regularly via cron.
    def cron(self, watch):
        if not self.config.cronenabled:
            logging.debug("cron is disabled")
            return

        # Check if "zeekctl install" has been run.
        if not self.config.is_zeekctl_installed():
            # Don't output anything here, otherwise the cron job may generate
            # emails before the user has a chance to do "zeekctl install".
            return

        cronui = cron.CronUI()
        tasks = cron.CronTasks(cronui, self.config, self, self.executor, self.pluginregistry)

        cronui.buffer_output()

        if watch:
            # Check if node state matches expected state, and start/stop if
            # necessary.
            startlist = []
            stoplist = []
            for (node, isrunning) in self._isrunning(self.config.nodes()):
                expectrunning = node.getExpectRunning()

                if not isrunning and expectrunning:
                    startlist.append(node)
                elif isrunning and not expectrunning:
                    stoplist.append(node)

            if startlist:
                results = self.start(startlist)
            if stoplist:
                results = self.stop(stoplist)

        # Check for dead hosts.
        tasks.check_hosts()

        # Generate statistics.
        tasks.log_stats(5)

        # Check available disk space.
        tasks.check_disk_space()

        # Expire old log files.
        tasks.expire_logs()

        # Expire old crash directories.
        tasks.expire_crash()

        # Update the HTTP stats directory.
        tasks.update_http_stats()

        # Run external command if we have one.
        tasks.run_cron_cmd()

        # Mail potential output.
        output = cronui.get_buffered_output()
        if output:
            success, out = self._sendmail("cron: " + output.splitlines()[0], output)
            if not success:
                self.ui.error("zeekctl cron failed to send mail: %s" % out)
                self.ui.info("Output of zeekctl cron:\n%s" % output)

        logging.debug("cron done")

